<!DOCTYPE html>


<html lang="zh-CN">


<head>
  <meta charset="utf-8" />
    
  <meta name="description" content="迎着朝阳的博客" />
  
  <meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1" />
  <title>
    kafka学习笔记（二） |  迎着朝阳
  </title>
  <meta name="generator" content="hexo-theme-ayer">
  
  <link rel="shortcut icon" href="https://dxysun.com/static/yan.png" />
  
  
<link rel="stylesheet" href="/dist/main.css">

  
<link rel="stylesheet" href="https://cdn.jsdelivr.net/gh/Shen-Yu/cdn/css/remixicon.min.css">

  
<link rel="stylesheet" href="/css/custom.css">

  
  
<script src="https://cdn.jsdelivr.net/npm/pace-js@1.0.2/pace.min.js"></script>

  
  

  
<script>
var _hmt = _hmt || [];
(function() {
	var hm = document.createElement("script");
	hm.src = "https://hm.baidu.com/hm.js?aa994a8d65700b8835787dd39d079d7e";
	var s = document.getElementsByTagName("script")[0]; 
	s.parentNode.insertBefore(hm, s);
})();
</script>


</head>

</html>

<body>
  <div id="app">
    
      
    <main class="content on">
      <section class="outer">
  <article
  id="post-javaForKafka2"
  class="article article-type-post"
  itemscope
  itemprop="blogPost"
  data-scroll-reveal
>
  <div class="article-inner">
    
    <header class="article-header">
       
<h1 class="article-title sea-center" style="border-left:0" itemprop="name">
  kafka学习笔记（二）
</h1>
 

    </header>
     
    <div class="article-meta">
      <a href="/2019/10/27/javaForKafka2/" class="article-date">
  <time datetime="2019-10-27T07:47:22.000Z" itemprop="datePublished">2019-10-27</time>
</a>   
<div class="word_count">
    <span class="post-time">
        <span class="post-meta-item-icon">
            <i class="ri-quill-pen-line"></i>
            <span class="post-meta-item-text"> 字数统计:</span>
            <span class="post-count">4.5k</span>
        </span>
    </span>

    <span class="post-time">
        &nbsp; | &nbsp;
        <span class="post-meta-item-icon">
            <i class="ri-book-open-line"></i>
            <span class="post-meta-item-text"> 阅读时长≈</span>
            <span class="post-count">16 分钟</span>
        </span>
    </span>
</div>
 
    </div>
      
    <div class="tocbot"></div>




  
    <div class="article-entry" itemprop="articleBody">
       
  <p>kafka学习笔记（二）</p>
<a id="more"></a>

<h1 id="核心概念"><a href="#核心概念" class="headerlink" title="核心概念"></a>核心概念</h1><h2 id="Topic"><a href="#Topic" class="headerlink" title="Topic"></a>Topic</h2><p>在 kafka 中，topic 是一个存储消息的逻辑概念，可以认为是一个消息集合。每条消息发送到 kafka 集群的消息都有<br>一个类别，这个类别就是Topic。物理上来说，不同的 topic 的消息是分开存储的，每个 topic 可以有多个生产者向它发送消息，也可以有多<br>个消费者去消费其中的消息。如图所示<br><img src="https://blog-dxysun.nos-eastchina1.126.net/topic.png" alt="topic"></p>
<h2 id="Partition"><a href="#Partition" class="headerlink" title="Partition"></a>Partition</h2><p>每个 topic 可以划分多个分区（每个 Topic 至少有一个分区），同一 topic 下的不同分区包含的消息是不同的。每个<br>消息在被添加到分区时，都会被分配一个 offset（称之为偏移量），它是消息在此分区中的唯一编号，kafka 通过 offset<br>保证消息在分区内的顺序，offset 的顺序性不跨分区，即 kafka只保证在同一个分区内的消息是有序的，同一个Topic的多个分区内的消息，kafka不保证其顺序性。<br><img src="https://blog-dxysun.nos-eastchina1.126.net/partition.png" alt="part"></p>
<h2 id="Topic-amp-Partition-的存储"><a href="#Topic-amp-Partition-的存储" class="headerlink" title="Topic&amp;Partition 的存储"></a>Topic&amp;Partition 的存储</h2><p>Partition 是以文件的形式存储在文件系统中，比如创建一个名为 firstTopic 的 topic，其中有 3 个 partition，那么在<br>kafka 的数据目录（/tmp/kafka-log）中就有 3 个目录，firstTopic-0~2，命名规则是<code>&lt;topic_name&gt;-&lt;partition_id&gt;</code></p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">bin&#x2F;kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic firstTopic</span><br></pre></td></tr></table></figure>

<h2 id="Broker"><a href="#Broker" class="headerlink" title="Broker"></a>Broker</h2><p>一个单独的Kafka server就是一个Broker。Broker的主要工作就是接受生产者发过来的消息，分配offset，之后保存到磁盘中；同时接受消费者、其他Broker的请求，根据请求类型进行相应处理并返回响应。<br>在一般的生产环境中，一个Broker独占一台物理服务器。</p>
<h1 id="消息分发"><a href="#消息分发" class="headerlink" title="消息分发"></a>消息分发</h1><h2 id="kafka-消息分发策略"><a href="#kafka-消息分发策略" class="headerlink" title="kafka 消息分发策略"></a>kafka 消息分发策略</h2><p>消息是 kafka 中最基本的数据单元，在 kafka 中，一条消息由 key、value 两部分构成，在发送一条消息时，我们可以指定这个 key，那么 producer 会根据 key 和 partition 机<br>制来判断当前这条消息应该发送并存储到哪个 partition 中。我们可以根据需要进行扩展 producer 的 partition 机制。</p>
<h2 id="消息默认的分发机制"><a href="#消息默认的分发机制" class="headerlink" title="消息默认的分发机制"></a>消息默认的分发机制</h2><p>默认情况下，kafka 采用的是 hash 取模的分区算法。如果Key 为 null，则会随机分配一个分区。这个随机是在这个参<br>数”metadata.max.age.ms”的时间范围内随机选择一个。对于这个时间段内，如果 key 为 null，则只会发送到唯一的分区。<br>这个值默认情况下是 10 分钟更新一次。<br>Metadata是Topic/Partition 和 broker 的映射关系，每一个 topic 的每一个 partition，需要知道对应的 broker 列表是什么，leader是谁，follower 是谁。<br>这些信息都是存储在 Metadata 这个类里面。</p>
<p>消费端消费是也可以指定分区，这样它就不会接收其他分区的数据</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br></pre></td><td class="code"><pre><span class="line"><span class="comment">//消费指定的0号分区</span></span><br><span class="line">TopicPartition topicPartition = <span class="keyword">new</span> TopicPartition(topic,<span class="number">0</span>);</span><br></pre></td></tr></table></figure>

<h1 id="消息的消费原理"><a href="#消息的消费原理" class="headerlink" title="消息的消费原理"></a>消息的消费原理</h1><h2 id="Consumer-Group"><a href="#Consumer-Group" class="headerlink" title="Consumer Group"></a>Consumer Group</h2><p>在实际生产过程中，每个 topic 都会有多个 partitions，多个 partitions 的好处在于，一方面能够对 broker 上的数据进行分片有效减少了消息的容量从而提升 io 性能。另外一方面，为了提高消费端的消费能力，一般会通过多个consumer 去消费同一个 topic ，也就是消费端的负载均衡机制。<br>在多个 partition 以及多个 consumer 的情况下，消费者是如何消费消息的，kafka存在 Consumer group消费组的概念 ，也就是 group.id 一样的consumer ，这些consumer属于一个消费组，一个consumer只能属于一个消费组，消费组保证其订阅的Topic的每个分区只被分配给该消费组下的一个消费者消费。如果不同的消费组订阅了同一个Topic，消费组之间互不干扰。<br>如果要实现一个消息可以被多个消费者同时消费（广播）的效果，则将每个消费者放入单独的一个Consumer group消费组中。<br>如果要实现一个消息只被一个消费者消费（独占）的效果，则将所有的消费者放入一个Consumer group消费组中。</p>
<h2 id="分区分配策略"><a href="#分区分配策略" class="headerlink" title="分区分配策略"></a>分区分配策略</h2><p>同一个Consumer group中的消费者对于一个topic中的多个 partition，存在一定的分区分配策略。<br>在 kafka 中，存在两种分区分配策略，一种是 Range（范围分区，默认）、另一种是 RoundRobin（轮询）。 通过partition.assignment.strategy 这个参数来设置。</p>
<h3 id="Range-strategy（范围分区）"><a href="#Range-strategy（范围分区）" class="headerlink" title="Range strategy（范围分区）"></a>Range strategy（范围分区）</h3><p>Range 策略是对每个Topic而言的，首先对同一个Topic里面的分区按照序号进行排序，并对消费者按照字母顺序进行排序。假设有10个分区，3个消费者，排完序的分区将会是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9；消费者线程排完序将会是C1-0, C2-0, C3-0。然后将 partitions 的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽，那么前面几个消费者线程将会多消费一个分区。在例子里面，有10个分区，3个消费者线程，10/3=3，而且除不尽，那么消费者线程 C1-0 将会多消费一个分区，所以最后分区分配的结果看起来是这样的：<br>C1-0 将消费 0, 1, 2, 3 分区<br>C2-0 将消费 4, 5, 6 分区<br>C3-0 将消费 7, 8, 9 分区</p>
<p>假如有11个分区，那么最后分区分配的结果看起来是这样的：<br>C1-0 将消费 0, 1, 2, 3 分区<br>C2-0 将消费 4, 5, 6, 7 分区<br>C3-0 将消费 8, 9, 10 分区</p>
<p>假如有 2 个Topic(T1 和 T2)，分别有 10 个分区，那么最后分区分配的结果看起来是这样的：<br>C1-0 将消费 T1 Topic的 0, 1, 2, 3 分区以及 T2 Topic的 0,1, 2, 3 分区<br>C2-0 将消费 T1 Topic的 4, 5, 6 分区以及 T2 Topic的 4, 5,6 分区<br>C3-0 将消费 T1 Topic的 7, 8, 9 分区以及 T2 Topic的 7, 8,9 分区</p>
<p>可以看出，C1-0 消费者线程比其他消费者线程多消费了2个分区，这就是Range strategy的一个很明显的弊端</p>
<h3 id="RoundRobin-strategy（轮询分区）"><a href="#RoundRobin-strategy（轮询分区）" class="headerlink" title="RoundRobin strategy（轮询分区）"></a>RoundRobin strategy（轮询分区）</h3><p>轮询分区策略是把所有 partition 和所有 consumer 线程都列出来，然后按照 hashcode 进行排序。最后通过轮询算法分配 partition 给消费线程。如果所有 consumer 实例的订阅是相同的，那么 partition 会均匀分布。<br>例如Topic T1有10个分区，4个消费者，假如按照 hashCode 排序完的 topic partitions 组依次为 T1-0, T1-1, T1-2, T1-3, T1-4, T1-5, T1-6, T1-7, T1-8, T1-9，消费者线程排序为 C1-0, C2-0, C3-0, C4-0，最后分区分配的结果为：<br>C1-0 将消费 T1-0, T1-4, T1-8 分区；<br>C2-0 将消费 T1-1, T1-5, T1-9 分区；<br>C3-0 将消费 T1-2, T1-6 分区；<br>C4-0 将消费 T1-3, T1-7 分区；</p>
<p>使用轮询分区策略必须满足两个条件</p>
<ol>
<li>同一个Consumer Group里面的所有消费者的num.streams必须相等</li>
<li>每个消费者订阅的主题必须是相同的</li>
</ol>
<h3 id="分区策略触发"><a href="#分区策略触发" class="headerlink" title="分区策略触发"></a>分区策略触发</h3><p>当出现以下几种情况时，kafka 会进行一次分区分配操作，也就是 kafka consumer 的 rebalance</p>
<ol>
<li>同一个 consumer group 内新增了消费者</li>
<li>消费者离开当前所属的 consumer group，比如主动停机或者宕机</li>
<li>topic 新增了分区（也就是分区数量发生了变化）</li>
</ol>
<p>kafka consuemr 的 rebalance 机制规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每个分区。而具体如何执行分区策略，就是上述提到的两种内置的分区策略。而 kafka 对于分配策略这块，提供了可插拔的实现方式，也就是说，除了这两种之外，还可以创建自己的分配机制。</p>
<h3 id="Rebalance的执行-以及管理consumer的group"><a href="#Rebalance的执行-以及管理consumer的group" class="headerlink" title="Rebalance的执行 以及管理consumer的group"></a>Rebalance的执行 以及管理consumer的group</h3><p>Kafka 提供了一个角色：coordinator 来执行对于consumer group的管理，当consumer group 的第一个consumer启动的时候，它会去和broker确定谁是它们组的coordinator，之后该group内的所有成员都会和该coordinator进行协调通信。</p>
<h4 id="确定-coordinator"><a href="#确定-coordinator" class="headerlink" title="确定 coordinator"></a>确定 coordinator</h4><p>consumer group如何确定自己的 coordinator, 消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest 请求，服务端会返回一个负载最小的broker节点的id，并将该broker设置为coordinator。</p>
<h4 id="JoinGroup-的过程"><a href="#JoinGroup-的过程" class="headerlink" title="JoinGroup 的过程"></a>JoinGroup 的过程</h4><p>在 rebalance 之前，需要保证 coordinator 是已经确定好了的，整个rebalance的过程分为两个步骤，Join 和 Sync<br>join: 表示加入到 consumer group 中，在这一步中，所有的成员都会向 coordinator 发送 joinGroup 的请求。一旦所有成员都发送了 joinGroup 请求，那么 coordinator 会选择一个consumer担任leader 角色，并把组成员信息和订阅信息发送消费者</p>
<p>syn：完成分区分配之后，就进入了Synchronizing Group State阶段，主要逻辑是向 GroupCoordinator 发送SyncGroupRequest 请求，并且处理 SyncGroupResponse 响应，简单来说，就是leader将消费者对应的 partition 分配方案通过coordinator同步给consumer group中的所有consumer</p>
<p>每个消费者都会向coordinator 发送sync group请求，不过只有leader节点会发送分配方案，其他消费者只是打打酱油而已。当leader把方案发给 coordinator 以后，coordinator 会把结果设置到 SyncGroupResponse中。<br>这样所有成员都知道自己应该消费哪个分区。</p>
<p>consumer group 的分区分配方案是在客户端执行的，Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性。</p>
<h2 id="保存消费端的消费位置"><a href="#保存消费端的消费位置" class="headerlink" title="保存消费端的消费位置"></a>保存消费端的消费位置</h2><h3 id="offset"><a href="#offset" class="headerlink" title="offset"></a>offset</h3><p>每个topic可以划分多个分区（每个Topic至少有一个分区），同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时，都会被分配一个offset（称之为偏移量），它是消息在此分区中的唯一编号，kafka 通过 offset 保证消息在分区内的顺序，offset的顺序不跨分区，即kafka 只保证在同一个分区内的消息是有序的；对于应用层的消费来说，每次消费一个消息并且提交以后，会保存当前消费到的最近的一个 offset。</p>
<h3 id="offset-保存位置"><a href="#offset-保存位置" class="headerlink" title="offset 保存位置"></a>offset 保存位置</h3><p>在 kafka 中，提供了一个__consumer_offsets_* 的一个topic，把 offset信息写入到这个topic中。<br>__consumer_offsets保存了每个consumer group某一时刻提交的offset信息。 __consumer_offsets 默认有50 个分区。 </p>
<p>可以根据groupid确定consumer_group保存在哪个分区中，计算公式<code>Math.abs(&quot;groupid&quot;.hashCode())%groupMetadataTopicPartitionCount</code>，默认情况下groupMetadataTopicPartitionCount有50个分区</p>
<h1 id="消息的存储"><a href="#消息的存储" class="headerlink" title="消息的存储"></a>消息的存储</h1><h2 id="消息的保存路径"><a href="#消息的保存路径" class="headerlink" title="消息的保存路径"></a>消息的保存路径</h2><p>kafka 是使用日志文件的方式来保存生产者和发送者的消息，每条消息都有一个offset值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据，为了避免日志文件过大，Log 并不是直接对应在一个磁盘上的日志文件，而是对应磁盘上的一个目录，这个目录的明明规则是<code>(topic_name)_(partition_id)</code><br>比如创建一个名为 firstTopic 的 topic，其中有 3 个 partition，那么在 kafka 的数据目录（/tmp/kafka-log）中就有 3 个目录，firstTopic-0~2</p>
<h2 id="多个分区在集群中的分配"><a href="#多个分区在集群中的分配" class="headerlink" title="多个分区在集群中的分配"></a>多个分区在集群中的分配</h2><p>如果对于一个 topic，在集群中创建多个partition，那么partition是如何分布的</p>
<ol>
<li>将所有 N 个Broker 和待分配的 i 个Partition排序</li>
<li>将第 i 个Partition分配到第(i mod n)个Broker上</li>
</ol>
<h2 id="消息写入的性能"><a href="#消息写入的性能" class="headerlink" title="消息写入的性能"></a>消息写入的性能</h2><p>现在大部分企业仍然用的是机械结构的磁盘，如果把消息以随机的方式写入到磁盘，那么磁盘首先要做的就是寻址，也就是定位到数据所在的物理地址，在磁盘上就要找到对应的柱面、磁头以及对应的扇区；这个过程相对内存来说会消耗大量时间，为了规避随机读写带来的时间消耗，kafka 采用顺序写的方式存储数据。即使是这样，但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈，所以 kafka 还有一个性能策略</p>
<h3 id="零拷贝"><a href="#零拷贝" class="headerlink" title="零拷贝"></a>零拷贝</h3><p>消息从发送到落地保存，broker 维护的消息日志本身就是文件目录，每个文件都是二进制保存，生产者和消费者使用相同的格式来处理。在消费者获取消息时，服务器先从硬盘读取数据到内存，然后把内存中的数据原封不动的通过 socket 发送给消费者，这个操作上经历了很多步骤。<br><img src="https://blog-dxysun.nos-eastchina1.126.net/infowrite.png" alt="info"></p>
<ul>
<li>操作系统将数据从磁盘读入到内核空间的页缓存</li>
<li>应用程序将数据从内核空间读入到用户空间缓存中</li>
<li>应用程序将数据写回到内核空间到 socket 缓存中</li>
<li>操作系统将数据从 socket 缓冲区复制到网卡缓冲区，以便将数据经网络发出</li>
</ul>
<p>这个过程涉及到 4 次上下文切换以及 4 次数据复制，并且有两次复制操作是由 CPU 完成。但是这个过程中，数据完全没有进行变化，仅仅是从磁盘复制到网卡缓冲区。<br>通过”零拷贝”技术，可以去掉这些没必要的数据复制操作，同时也会减少上下文切换次数。现代的 unix 操作系统提供一个优化的代码路径，用于将数据从页缓存传输到 socket;<br>在 Linux 中，是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法<code>FileChannel.transferTo API</code><br>使用 sendfile，只需要一次拷贝就行，允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中，只有最后一步将数据拷贝到网卡缓存中是需要的。</p>
<h2 id="消息的存储原理"><a href="#消息的存储原理" class="headerlink" title="消息的存储原理"></a>消息的存储原理</h2><h3 id="消息的文件存储机制"><a href="#消息的文件存储机制" class="headerlink" title="消息的文件存储机制"></a>消息的文件存储机制</h3><p>kafka 是通过分段的方式将 Log 分为多个 LogSegment，LogSegment 是一个逻辑上的概念，一个 LogSegment 对应磁盘上的一个日志文件和一个索引文件，其中日志文件是用来记录消息的，索引文件是用来保存消息的索引。</p>
<h4 id="LogSegment"><a href="#LogSegment" class="headerlink" title="LogSegment"></a>LogSegment</h4><p>kafka 以 segment 为单位又把 partition 进行细分。每个 partition 相当于一个巨型文件被平均分配到多个大小相等的segment数据文件中<br>（每个 segment 文件中的消息不一定相等），这种特性方便已经被消费的消息的清理，提高磁盘的利用率。</p>
<blockquote>
<p>log.segment.bytes=107370 (设置分段大小),默认是1GB，</p>
</blockquote>
<p>segment file 由 2 大部分组成，分别为 index file 和 data file，此 2 个文件一一对应，成对出现，后缀”.index”和”.log”分别表示为 segment 索引文件、数据文件。<br>segment 文件命名规则：partion 全局的第一个 segment从 0 开始，后续每个 segment 文件名为上一个 segment文件最后一条消息的 offset 值进行递增。<br>数值最大为 64 位long 大小，20 位数字字符长度，没有数字用 0 填充<br>通过下面这条命令可以看到 kafka 消息日志的内容</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">sh kafka-run-class.sh kafka.tools.DumpLogSegments --files &#x2F;tmp&#x2F;kafka-logs&#x2F;test0&#x2F;00000000000000000000.log --print-data-log</span><br></pre></td></tr></table></figure>

<p>segment 中 index 和 log 的对应关系</p>
<p>为了提高查找消息的性能，为每一个日志文件添加 2 个索引索引文件：OffsetIndex 和 TimeIndex，分别对应*.index以及*.timeindex,TimeIndex 索引文件格式：它是映射时间戳和相对 offset<br>例如下图所示<br><img src="https://blog-dxysun.nos-eastchina1.126.net/kafka-index.png" alt="index"><br>如图所示，index 中存储了索引以及物理偏移量。 log 存储了消息的内容。索引文件的元数据执行对应数据文件中message 的物理偏移地址。举个简单的案例来说，以[4053,80899]为例，在 log 文件中，对应的是第4053条记录，物理偏移量（position）为 80899. position 是ByteBuffer的指针位置</p>
<p>在 partition 中通过 offset 查找 message</p>
<ol>
<li>根据 offset 的值，查找 segment 段中的 index 索引文件。由于索引文件命名是以上一个文件的最后一个offset 进行命名的，所以，使用二分查找算法能够根据offset 快速定位到指定的索引文件</li>
<li>找到索引文件后，根据 offset 进行定位，找到索引文件中的符合范围的索引。（kafka 采用稀疏索引的方式来提高查找性能）</li>
<li>得到 position 以后，再到对应的 log 文件中，从 position 出发开始查找 offset 对应的消息，将每条消息的offset与目标 offset 进行比较，直到找到消息<blockquote>
<p>例如，要查找 offset=2490 这条消息，那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引，再到 log 文件中，根据 49111 这个 position 开始查找，比较每条消息的 offset 是否大于等于 2490，最后查找到对应的消息以后返回。</p>
</blockquote>
</li>
</ol>
<p>Log 文件的消息内容分析</p>
<p>前面通过kafka提供的命令，可以查看二进制的日志文件信息，一条消息，会包含很多的字段。<br>例如</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize: -1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: message_5371</span><br></pre></td></tr></table></figure>
<blockquote>
<p>createTime 表示创建时间、keysize 和 valuesize 表示 key 和 value 的大小、 compresscodec表示压缩编码、payload表示消息的具体内容</p>
</blockquote>
 
      <!-- reward -->
      
      <div id="reword-out">
        <div id="reward-btn">
          打赏
        </div>
      </div>
      
    </div>
    

    <!-- copyright -->
    
    <footer class="article-footer">
       
<div class="share-btn">
      <span class="share-sns share-outer">
        <i class="ri-share-forward-line"></i>
        分享
      </span>
      <div class="share-wrap">
        <i class="arrow"></i>
        <div class="share-icons">
          
          <a class="weibo share-sns" href="javascript:;" data-type="weibo">
            <i class="ri-weibo-fill"></i>
          </a>
          <a class="weixin share-sns wxFab" href="javascript:;" data-type="weixin">
            <i class="ri-wechat-fill"></i>
          </a>
          <a class="qq share-sns" href="javascript:;" data-type="qq">
            <i class="ri-qq-fill"></i>
          </a>
          <a class="douban share-sns" href="javascript:;" data-type="douban">
            <i class="ri-douban-line"></i>
          </a>
          <!-- <a class="qzone share-sns" href="javascript:;" data-type="qzone">
            <i class="icon icon-qzone"></i>
          </a> -->
          
          <a class="facebook share-sns" href="javascript:;" data-type="facebook">
            <i class="ri-facebook-circle-fill"></i>
          </a>
          <a class="twitter share-sns" href="javascript:;" data-type="twitter">
            <i class="ri-twitter-fill"></i>
          </a>
          <a class="google share-sns" href="javascript:;" data-type="google">
            <i class="ri-google-fill"></i>
          </a>
        </div>
      </div>
</div>

<div class="wx-share-modal">
    <a class="modal-close" href="javascript:;"><i class="ri-close-circle-line"></i></a>
    <p>扫一扫，分享到微信</p>
    <div class="wx-qrcode">
      <img src="//api.qrserver.com/v1/create-qr-code/?size=150x150&data=https://dxysun.com/2019/10/27/javaForKafka2/" alt="微信分享二维码">
    </div>
</div>

<div id="share-mask"></div>  
  <ul class="article-tag-list" itemprop="keywords"><li class="article-tag-list-item"><a class="article-tag-list-link" href="/tags/java/" rel="tag">java</a></li><li class="article-tag-list-item"><a class="article-tag-list-link" href="/tags/kafka/" rel="tag">kafka</a></li></ul>

    </footer>
  </div>

   
  <nav class="article-nav">
    
      <a href="/2019/10/27/javaForKafka3/" class="article-nav-link">
        <strong class="article-nav-caption">上一篇</strong>
        <div class="article-nav-title">
          
            kafka学习笔记（三）
          
        </div>
      </a>
    
    
      <a href="/2019/10/19/javaForKafka/" class="article-nav-link">
        <strong class="article-nav-caption">下一篇</strong>
        <div class="article-nav-title">kafka学习笔记（一）</div>
      </a>
    
  </nav>

  
   
  
</article>

</section>
      <footer class="footer">
  <div class="outer">
    <ul>
      <li>
        Copyrights &copy;
        2015-2024
        <i class="ri-heart-fill heart_icon"></i> dxysun
      </li>
    </ul>
    <ul>
      <li>
        
        
        
        由 <a href="https://hexo.io" target="_blank">Hexo</a> 强力驱动
        <span class="division">|</span>
        主题 - <a href="https://github.com/Shen-Yu/hexo-theme-ayer" target="_blank">Ayer</a>
        
      </li>
    </ul>
    <ul>
      <li>
        
        
        <span>
  <span><i class="ri-user-3-fill"></i>访问人数:<span id="busuanzi_value_site_uv"></span></s>
  <span class="division">|</span>
  <span><i class="ri-eye-fill"></i>浏览次数:<span id="busuanzi_value_page_pv"></span></span>
</span>
        
      </li>
    </ul>
    <ul>
      
        <li>
          <a href="https://beian.miit.gov.cn" target="_black" rel="nofollow">豫ICP备17012675号-1</a>
        </li>
        
    </ul>
    <ul>
      
    </ul>
    <ul>
      <li>
        <!-- cnzz统计 -->
        
      </li>
    </ul>
  </div>
</footer>
      <div class="float_btns">
        <div class="totop" id="totop">
  <i class="ri-arrow-up-line"></i>
</div>

<div class="todark" id="todark">
  <i class="ri-moon-line"></i>
</div>

      </div>
    </main>
    <aside class="sidebar on">
      <button class="navbar-toggle"></button>
<nav class="navbar">
  
  <div class="logo">
    <a href="/"><img src="https://dxysun.com/static/logo.png" alt="迎着朝阳"></a>
  </div>
  
  <ul class="nav nav-main">
    
    <li class="nav-item">
      <a class="nav-item-link" href="/">主页</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/archives">归档</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/categories">分类</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/tags">标签</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/photos">相册</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/friends">友链</a>
    </li>
    
    <li class="nav-item">
      <a class="nav-item-link" href="/about">关于我</a>
    </li>
    
  </ul>
</nav>
<nav class="navbar navbar-bottom">
  <ul class="nav">
    <li class="nav-item">
      
      <a class="nav-item-link nav-item-search"  title="搜索">
        <i class="ri-search-line"></i>
      </a>
      
      
      <a class="nav-item-link" target="_blank" href="/atom.xml" title="RSS Feed">
        <i class="ri-rss-line"></i>
      </a>
      
    </li>
  </ul>
</nav>
<div class="search-form-wrap">
  <div class="local-search local-search-plugin">
  <input type="search" id="local-search-input" class="local-search-input" placeholder="Search...">
  <div id="local-search-result" class="local-search-result"></div>
</div>
</div>
    </aside>
    <script>
      if (window.matchMedia("(max-width: 768px)").matches) {
        document.querySelector('.content').classList.remove('on');
        document.querySelector('.sidebar').classList.remove('on');
      }
    </script>
    <div id="mask"></div>

<!-- #reward -->
<div id="reward">
  <span class="close"><i class="ri-close-line"></i></span>
  <p class="reward-p"><i class="ri-cup-line"></i>请我喝杯咖啡吧~</p>
  <div class="reward-box">
    
    <div class="reward-item">
      <img class="reward-img" src="https://tu.dxysun.com/alipay-20201219151322.jpg">
      <span class="reward-type">支付宝</span>
    </div>
    
    
    <div class="reward-item">
      <img class="reward-img" src="https://tu.dxysun.com/weixin-20201219151346.png">
      <span class="reward-type">微信</span>
    </div>
    
  </div>
</div>
    
<script src="/js/jquery-2.0.3.min.js"></script>


<script src="/js/lazyload.min.js"></script>

<!-- Tocbot -->


<script src="/js/tocbot.min.js"></script>

<script>
  tocbot.init({
    tocSelector: '.tocbot',
    contentSelector: '.article-entry',
    headingSelector: 'h1, h2, h3, h4, h5, h6',
    hasInnerContainers: true,
    scrollSmooth: true,
    scrollContainer: 'main',
    positionFixedSelector: '.tocbot',
    positionFixedClass: 'is-position-fixed',
    fixedSidebarOffset: 'auto'
  });
</script>

<script src="https://cdn.jsdelivr.net/npm/jquery-modal@0.9.2/jquery.modal.min.js"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/jquery-modal@0.9.2/jquery.modal.min.css">
<script src="https://cdn.jsdelivr.net/npm/justifiedGallery@3.7.0/dist/js/jquery.justifiedGallery.min.js"></script>

<script src="/dist/main.js"></script>

<!-- ImageViewer -->

<!-- Root element of PhotoSwipe. Must have class pswp. -->
<div class="pswp" tabindex="-1" role="dialog" aria-hidden="true">

    <!-- Background of PhotoSwipe. 
         It's a separate element as animating opacity is faster than rgba(). -->
    <div class="pswp__bg"></div>

    <!-- Slides wrapper with overflow:hidden. -->
    <div class="pswp__scroll-wrap">

        <!-- Container that holds slides. 
            PhotoSwipe keeps only 3 of them in the DOM to save memory.
            Don't modify these 3 pswp__item elements, data is added later on. -->
        <div class="pswp__container">
            <div class="pswp__item"></div>
            <div class="pswp__item"></div>
            <div class="pswp__item"></div>
        </div>

        <!-- Default (PhotoSwipeUI_Default) interface on top of sliding area. Can be changed. -->
        <div class="pswp__ui pswp__ui--hidden">

            <div class="pswp__top-bar">

                <!--  Controls are self-explanatory. Order can be changed. -->

                <div class="pswp__counter"></div>

                <button class="pswp__button pswp__button--close" title="Close (Esc)"></button>

                <button class="pswp__button pswp__button--share" style="display:none" title="Share"></button>

                <button class="pswp__button pswp__button--fs" title="Toggle fullscreen"></button>

                <button class="pswp__button pswp__button--zoom" title="Zoom in/out"></button>

                <!-- Preloader demo http://codepen.io/dimsemenov/pen/yyBWoR -->
                <!-- element will get class pswp__preloader--active when preloader is running -->
                <div class="pswp__preloader">
                    <div class="pswp__preloader__icn">
                        <div class="pswp__preloader__cut">
                            <div class="pswp__preloader__donut"></div>
                        </div>
                    </div>
                </div>
            </div>

            <div class="pswp__share-modal pswp__share-modal--hidden pswp__single-tap">
                <div class="pswp__share-tooltip"></div>
            </div>

            <button class="pswp__button pswp__button--arrow--left" title="Previous (arrow left)">
            </button>

            <button class="pswp__button pswp__button--arrow--right" title="Next (arrow right)">
            </button>

            <div class="pswp__caption">
                <div class="pswp__caption__center"></div>
            </div>

        </div>

    </div>

</div>

<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/photoswipe@4.1.3/dist/photoswipe.min.css">
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/photoswipe@4.1.3/dist/default-skin/default-skin.min.css">
<script src="https://cdn.jsdelivr.net/npm/photoswipe@4.1.3/dist/photoswipe.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/photoswipe@4.1.3/dist/photoswipe-ui-default.min.js"></script>

<script>
    function viewer_init() {
        let pswpElement = document.querySelectorAll('.pswp')[0];
        let $imgArr = document.querySelectorAll(('.article-entry img:not(.reward-img)'))

        $imgArr.forEach(($em, i) => {
            $em.onclick = () => {
                // slider展开状态
                // todo: 这样不好，后面改成状态
                if (document.querySelector('.left-col.show')) return
                let items = []
                $imgArr.forEach(($em2, i2) => {
                    let img = $em2.getAttribute('data-idx', i2)
                    let src = $em2.getAttribute('data-target') || $em2.getAttribute('src')
                    let title = $em2.getAttribute('alt')
                    // 获得原图尺寸
                    const image = new Image()
                    image.src = src
                    items.push({
                        src: src,
                        w: image.width || $em2.width,
                        h: image.height || $em2.height,
                        title: title
                    })
                })
                var gallery = new PhotoSwipe(pswpElement, PhotoSwipeUI_Default, items, {
                    index: parseInt(i)
                });
                gallery.init()
            }
        })
    }
    viewer_init()
</script>

<!-- MathJax -->

<script type="text/x-mathjax-config">
  MathJax.Hub.Config({
      tex2jax: {
          inlineMath: [ ['$','$'], ["\\(","\\)"]  ],
          processEscapes: true,
          skipTags: ['script', 'noscript', 'style', 'textarea', 'pre', 'code']
      }
  });

  MathJax.Hub.Queue(function() {
      var all = MathJax.Hub.getAllJax(), i;
      for(i=0; i < all.length; i += 1) {
          all[i].SourceElement().parentNode.className += ' has-jax';
      }
  });
</script>

<script src="https://cdn.jsdelivr.net/npm/mathjax@2.7.6/unpacked/MathJax.js?config=TeX-AMS-MML_HTMLorMML"></script>
<script>
  var ayerConfig = {
    mathjax: true
  }
</script>

<!-- Katex -->

<!-- busuanzi  -->


<script src="/js/busuanzi-2.3.pure.min.js"></script>


<!-- ClickLove -->

<!-- ClickBoom1 -->

<!-- ClickBoom2 -->

<!-- CodeCopy -->


<link rel="stylesheet" href="/css/clipboard.css">

<script src="https://cdn.jsdelivr.net/npm/clipboard@2/dist/clipboard.min.js"></script>
<script>
  function wait(callback, seconds) {
    var timelag = null;
    timelag = window.setTimeout(callback, seconds);
  }
  !function (e, t, a) {
    var initCopyCode = function(){
      var copyHtml = '';
      copyHtml += '<button class="btn-copy" data-clipboard-snippet="">';
      copyHtml += '<i class="ri-file-copy-2-line"></i><span>COPY</span>';
      copyHtml += '</button>';
      $(".highlight .code pre").before(copyHtml);
      $(".article pre code").before(copyHtml);
      var clipboard = new ClipboardJS('.btn-copy', {
        target: function(trigger) {
          return trigger.nextElementSibling;
        }
      });
      clipboard.on('success', function(e) {
        let $btn = $(e.trigger);
        $btn.addClass('copied');
        let $icon = $($btn.find('i'));
        $icon.removeClass('ri-file-copy-2-line');
        $icon.addClass('ri-checkbox-circle-line');
        let $span = $($btn.find('span'));
        $span[0].innerText = 'COPIED';
        
        wait(function () { // 等待两秒钟后恢复
          $icon.removeClass('ri-checkbox-circle-line');
          $icon.addClass('ri-file-copy-2-line');
          $span[0].innerText = 'COPY';
        }, 2000);
      });
      clipboard.on('error', function(e) {
        e.clearSelection();
        let $btn = $(e.trigger);
        $btn.addClass('copy-failed');
        let $icon = $($btn.find('i'));
        $icon.removeClass('ri-file-copy-2-line');
        $icon.addClass('ri-time-line');
        let $span = $($btn.find('span'));
        $span[0].innerText = 'COPY FAILED';
        
        wait(function () { // 等待两秒钟后恢复
          $icon.removeClass('ri-time-line');
          $icon.addClass('ri-file-copy-2-line');
          $span[0].innerText = 'COPY';
        }, 2000);
      });
    }
    initCopyCode();
  }(window, document);
</script>


<!-- CanvasBackground -->


<script src="/js/dz.js"></script>



    
  </div>
</body>

</html>